package com.example.springbootdemo.service.serviceImp;

import com.example.springbootdemo.service.RocketMQHandelService;
import jdk.internal.joptsimple.internal.Strings;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.message.MessageBuilder;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.client.apis.producer.Transaction;
import org.apache.rocketmq.client.apis.producer.TransactionResolution;
import org.apache.rocketmq.client.java.example.ProducerSingleton;
import org.apache.rocketmq.client.java.message.MessageBuilderImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.Collections;

/**
 * @ClassName: RocketMQHandelImpl
 * @Description:
 * @Author: TXW
 * @Date: 2025/7/22
 */
@Service
public class RocketMQHandelImpl implements RocketMQHandelService {

    @Resource
    private Producer producer;

    @Resource
    private ClientServiceProvider provider;


    private static final Logger log = LoggerFactory.getLogger(RocketMQHandelService.class);

    @Override
    public void sendNormalMessage(String topic, String tag, String messageContent, String[] keys){
        // Define your message body.
        byte[] body = messageContent.getBytes(StandardCharsets.UTF_8);
        Message message = provider.newMessageBuilder()
                // Set topic for the current message.
                .setTopic(topic)
                // Message secondary classifier of message besides topic.
                .setTag(tag)
                // Key(s) of the message, another way to mark message besides message id.
                .setKeys(keys)
                .setBody(body)
                .build();
        try {
            SendReceipt sendReceipt = producer.send(message);
            log.info("Send normal message successfully, messageId={}", sendReceipt.getMessageId());
        } catch (ClientException e) {
            log.error("Send normal message Failed", e);
        }
    }

    /**
     * 创建顺序消息主题，并开启顺序消息功能 +message.type=FIFO
     * */
    @Override
    public void sendOrderMessage(String topic, String tag, String messageContent, String[] keys,String orderGroup) {
        byte[] body = messageContent.getBytes(StandardCharsets.UTF_8);
        Message message = provider.newMessageBuilder()
                // Set topic for the current message.
                .setTopic(topic)
                // Message secondary classifier of message besides topic.
                .setTag(tag)
                // Key(s) of the message, another way to mark message besides message id.
                .setKeys(keys)
                .setMessageGroup(orderGroup)
                .setBody(body)
                .build();
        try {
            SendReceipt sendReceipt = producer.send(message);
            log.info("Send delay message successfully, messageId={}", sendReceipt.getMessageId());
        } catch (ClientException e) {
            log.error("Send delay message Failed", e);
        }
    }


    /**
     * 创建事务消息主题，并开启事务消息功能 +message.type=TRANSACTION
     * */
    @Override
    public void sendTransactionMessage(String topic, String tag, String messageContent, String[] keys)  {
        MessageBuilder messageBuilder = new MessageBuilderImpl();
        //构造事务生产者：事务消息需要生产者构建一个事务检查器，用于检查确认异常半事务的中间状态。
        Producer producer = null;
        try {
            producer = provider.newProducerBuilder()
                    .setTransactionChecker(messageView -> {
                        /**
                         * 事务检查器一般是根据业务的ID去检查本地事务是否正确提交还是回滚，此处以订单ID属性为例。
                         * 在订单表找到了这个订单，说明本地事务插入订单的操作已经正确提交；如果订单表没有订单，说明本地事务已经回滚。
                         */
                        final String orderId = messageView.getProperties().get("OrderId");
                        if (Strings.isNullOrEmpty(orderId)) {
                            // 错误的消息，直接返回Rollback。
                            return TransactionResolution.ROLLBACK;
                        }
                        return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;
                    }).build();
        } catch (ClientException e) {
            e.printStackTrace();
        }
        //开启事务分支。
        final Transaction transaction;
        try {
            transaction = producer.beginTransaction();
        } catch (ClientException e) {
            e.printStackTrace();
            //事务分支开启失败，直接退出。
            return;
        }
        Message message = messageBuilder.setTopic("topic")
                //设置消息索引键，可根据关键字精确查找某条消息。
                .setKeys("messageKey")
                //设置消息Tag，用于消费端根据指定Tag过滤消息。
                .setTag("messageTag")
                //一般事务消息都会设置一个本地事务关联的唯一ID，用来做本地事务回查的校验。
                .addProperty("OrderId", "xxx")
                //消息体。
                .setBody("messageBody".getBytes())
                .build();
        //发送半事务消息
        final SendReceipt sendReceipt;
        try {
            sendReceipt = producer.send(message, transaction);
        } catch (ClientException e) {
            //半事务消息发送失败，事务可以直接退出并回滚。
            return;
        }
        /**
         * 执行本地事务，并确定本地事务结果。
         * 1. 如果本地事务提交成功，则提交消息事务。
         * 2. 如果本地事务提交失败，则回滚消息事务。
         * 3. 如果本地事务未知异常，则不处理，等待事务消息回查。
         *
         */
        boolean localTransactionOk = doLocalTransaction();
        if (localTransactionOk) {
            try {
                transaction.commit();
            } catch (ClientException e) {
                // 业务可以自身对实时性的要求选择是否重试，如果放弃重试，可以依赖事务消息回查机制进行事务状态的提交。
                e.printStackTrace();
            }
        } else {
            try {
                transaction.rollback();
            } catch (ClientException e) {
                // 建议记录异常信息，回滚异常时可以无需重试，依赖事务消息回查机制进行事务状态的提交。
                e.printStackTrace();
            }
        }
    }

    /**
     * 使用延时消息发送时需要创建延时主题以及开启延时消息功能 +message.type=DELAY
     * */
    @Override
    public void sendDelayMessage(String topic, String tag, String messageContent, String[] keys,Long delayTime) {
        byte[] body = messageContent.getBytes(StandardCharsets.UTF_8);
        Message message = provider.newMessageBuilder()
                // Set topic for the current message.
                .setTopic(topic)
                // Message secondary classifier of message besides topic.
                .setTag(tag)
                // Key(s) of the message, another way to mark message besides message id.
                .setKeys(keys)
                .setDeliveryTimestamp(delayTime)
                .setBody(body)
                .build();
        try {
            SendReceipt sendReceipt = producer.send(message);
            log.info("Send delay message successfully, messageId={}", sendReceipt.getMessageId());
        } catch (ClientException e) {
            log.error("Send delay message Failed", e);
        }
    }


    //演示demo，模拟订单表查询服务，用来确认订单事务是否提交成功。
    private static boolean checkOrderById(String orderId) {
        return true;
    }
    //演示demo，模拟本地事务的执行结果。
    private static boolean doLocalTransaction() {
        return true;
    }

}
